Python之路:(十四)I/O多路复用

回顾原生Socket

“他是所有WEB服务器的祖宗”

Socket,用于描述IP地址和端口,是一个通信链的句柄,可以用来实现不同虚拟机或不同计算机之间的通信。在Internet上的主机一般运行了多个服务软件,同时提供几种服务。每种服务都打开一个Socket,并绑定到一个端口上,不同的端口对应于不同的服务,这些服务通过Socket向网络发出请求或者应答网络请求。

pupepet、ansible、他们也可以通过输入命令然后返回结果这个也是基于Socket来实现的。

上一篇《初识socket》:server端他们仅能处理一个请求在有连接过来的时候,如果第一个请求在和服务器连接中,那么第二个只能等待第一个断开后第二个才能连接

socket拓扑图

过程:
第一请求发送了一个操作,server端还未返回,那么现在两头都在等待着输入。
那么这段时间第二客户端过来的请求被挂起处于等待状态!

现在服务端是不是在空闲着呢?他只占着I/O资源,CPU是不是空闲着呢?他阻塞着后面的请求无法进来。
不急继续往下看

网络IO阻塞模型

介绍:
网络I/O模型讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。

什么是I/O

  1. 先了解什么是I/O:

    • I/O(input/output),即输入/输出端口。每个设备都会有一个专用的I/O地址,用来处理自己的输入输出信息。
  2. I/O models:

    • 阻塞:blocking IO
    • 非阻塞:non-blocking IO
    • 同步:synchronous IO
    • 异步:asynchronous IO
  3. IO发生时涉及的对象和步骤:以输入操作的socket为例:第一步:首先等待网络数据到达,当数据接收就会复制到内核缓冲区中,第二步:复制从内核缓冲区到应用缓冲区

    • 等待数据准备 (Waiting for the data to be ready)
    • 将数据从内核拷贝到进程中(Copying the data from the kernel to the process) 记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

Blocking I/O Model

默认情况下所有的Socket是阻塞,看下面的图例:

I/O Model

当用户进程(应用程序)调用了recvfrom这个系统接口,kernel就开始了IO的第一个阶段准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如没有收到一个完整的TCP/UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝用户内存,然后kernel返回结果,用户进程才解除 block的状态,重新运行起来。

所以阻塞:blocking IO的特点是I/O执行时的两个操作(等待数据准备(Waiting for the data to be ready)、将数据从内核拷贝到进程中(Copying the data from the kernel to the process))都是阻塞的

Python socket中:accept() recv() 是阻塞的

所以,所谓阻塞型接口是指系统调用(一般是IO接口)如果不返回结果就一直阻塞,就是socket经常说的,有发就有收收发必相等如果两边都在同时收,是不是阻塞着后面的代码就无法执行?

那既然原生的Socket是阻塞的,那有什么办法来解决呢?

使用多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接

我们假设对上述的服务器 / 客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。于是有了如下的模型。

I/O Model

在上述的线程 / 时间图例中,主线程持续等待客户端的连接请求,如果有连接,则创建新线程,并在新线程中提供为前例同样的问答服务。

很多人可能不明白为何一个socket可以accept多次。实际上socket的设计者可能特意为多客户机的情况留下了伏笔,让accept()能够返回一个新的socket对象。

  • 执行完bind()和listen()后,操作系统已经开始在指定的端口处监听所有的连接请求,如果有请求,则将该连接请求加入请求队列。
  • 调用accept()接口正是从的请求队列抽取第一个连接信息,创建一个新的socket返回句柄。新的socket句柄即是后续read()和recv()的输入参数。如果请求队列当前没有请求,则accept()将进入阻塞状态直到有请求进入队列。

上述多线程的服务器模型似乎完美的解决了为多个客户机提供问答服务的要求,但其实并不尽然。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。

很多程序员可能会考虑使用“线程池”或“进程池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“进程池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。

这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如webspheretomcat和各种数据库等。但是,“线程池”和“进程池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“”始终有其上限,当请求大大超过上限时,“”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“”必须考虑其面临的响应规模,并根据响应规模调整“”的大小。

对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“进程池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

非阻塞:non-blocking IO

import time
import socket

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  # 创建socket对象
sk.setsockopt
sk.bind(('127.0.0.1',6666))  # 设置监听的IP与端口
sk.listen(5)  # 设置client最大等待连接数
sk.setblocking(False) # 这里设置setblocking为False accept将不在阻塞,但是如果没有收到请求就会报错

while True:  # 循环
    try:
        print('waiting client connection .......')
        # accept()接受客户端发送过来的请求:connection代表客户端对象,address是客户端的IP
        connection, address = sk.accept()
        # recv()接收客户端信息
        client_messge = connection.recv(1024)
        # 打印客户端信息
        print('client send %s' % client_messge)
        # 发送回执信息给client
        connection.sendall(bytes('僵尸吃了你的脑子!!!', encoding='utf-8'))
        # connection.send()
        # 关闭和client的连接
        connection.close()
    except Exception as error:
        print(error)
    time.sleep(4)

看上面的代码,修改了setblocking的值,那么现在accept()将不再阻塞。所以他类似下面的图:

I/O Model

EWOULDBLOCK 意思是说:该操作可能会被阻塞。WOULD BLOCK是可能会被阻塞的意思。

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从 用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次 发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。 非阻塞的接口相比于阻塞型接口的显著差异在于:在被调用之后立即返回。python中的 sk.setblocking(False) accept() 将不会阻塞

多路复用IO(IO multiplexing)

IO multiplexing这个词可能有点陌生,但是如果我说select/epoll/poll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll/poll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll/poll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

I/O Model

I/O多路复用指:通过一种机制,可以监视多个描述符,监听的描述符发生了改变,可读了或者可写了,一旦他发生了改变,那我就可以得到一个回调信息或者我主动的去,去知道系统发生变化了,达到一个上下文切换的目的!

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

select:

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll:

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll:

直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。(回调机制决定了epoll不需要像select、poll那样循环得方式去检查文件描述符是否发生改变,而是在一个文件描述符上加一层回调,在使用这个文件描述符得时候会被激活得到通知。)

根据系统不同:他支持的也不同:

Windows Python:
    提供: kqueue、select
Mac Python:
    提供: kqueue、select
Linux Python:
    提供: select、poll、epoll、kqueue

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。

对于Select

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)

参数: 可接受四个参数(前三个必须)
返回值:三个列表

select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1列表中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2列表中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3列表中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
5、当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
利用select监听终端操作实例:
import select
import sys

while True:
    readable, writeable, error = select.select([sys.stdin, ], [], [], 1)
    '''select.select([sys.stdin,],[],[],1)用到I/O多路复用,第一个参数是列表,我放进去的是stdin就是我输入进去东西的描述符,
       相当于打开一个文件,和obj = socket(),类似的文件描述符,
       sys.stdin 他只是一个特殊的文件描述符= 终端的输入,一旦你输入OK select I/O多路复用他就感知到了。
       先看readable这个参数,其他的先不用看一旦你发生了我就他他放到readable里了,readable是一个列表,
       这里添加的就是修改的那个文件描述符,如果你一直没有修改过,那么readable他就是一个空的列表
    '''
    if sys.stdin in readable:
        message = sys.stdin.readline()
        print('select get stdin %s' % message)

'''
注:
1、[sys.stdin,]  以后不管是列表还是元组在最后的元素后面建议增加一个逗号,拿元组举例(1,) | (1) 这两个有区别吗?是不是第二个
更像方法的调用或者函数的调用,加个`,`是不是更容易分清楚。还有就是在以后写django的配置文件的时候,他是必须要加的。写作习惯
2、select第一个参数他就是监听多个文件句柄,当谁改变了我是不是就可以监听到!
3、select参数里1是超时时间,当到select那一行后,如果这里还是没有输入,那么我就继续走!
'''
I/O多路复用应用案例:利用select实现伪’同时’处理多个socket客户端请求
服务端
# #!/usr/bin/env python
# # -*- coding:utf-8 -*-
# # Author: Liang Lian
# # Python 3.5

import socket
import select
import queue
import subprocess
import json
sk = socket.socket()  # 创建socket对象
sk.bind(('127.0.0.1', 6666,))  # 设置监听的IP与端口
sk.listen(5)  # 设置client最大等待连接数
inputs = [sk, ]  # 需要侦听接收消息的socket对象列表
outputs = []  # 所有给server端发过消息的客户端socket列表,都是需要回消息的
messages = {}  # 接受到的消息
# message的样板信息
# message = {
#    'c1':队列(存放客户端发送过来的消息)
#    'c2':队列,
# }

while True:  # 循环
    rlist, wlist, e = select.select(inputs, outputs, [], 1)
    meu = '''
    inputs(侦听已经链接的socket列表): %s
    rlist(侦听的socket中发生变化的socket列表): %s
    wlist(侦听回消息列表,发生变化的socket列表): %s
    outputs(需要回消息的socket列表): %s
    '''
    print(meu % (len(inputs), len(rlist), len(wlist), len(outputs)))
    # 监听sk(服务器端)对象,如果sk对象发生变化,表示有客户端连接来了,此时rlist值为[sk]
    # 监听connection对象,如果connection发生变化,表示客户端有新消息发过来了,此时rlist的值为[客户端]
    # rlist = [sk,]
    for r in rlist:  # 轮询侦听的socket列表
        if r == sk:  # 如果侦听到是服务端socket发送变化了,说明有新的客户端链接过来了
            connection, address = r.accept()  # 接收客户端对象
            # connection是什么? 其实是客户端socket对象
            inputs.append(connection)  # 加到侦听的socket对象列表中
            messages[connection] = queue.Queue()  # 字典中为这个客户端连接建立一个消息队列
        else:
            '''
            如果侦听到发送变化的socket对象不是服务端自己的socket,那么就是客户端socket变化了,说明客户端那边发消息过来了
            '''
            # 有人给我发消息了
            print("=======")
            ret = r.recv(1024)  # 接收消息
            if ret:
                p = subprocess.Popen(str(ret, encoding='utf-8'),
                shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                res = p.stdout.read()
                if not res:
                    res = 'command error'
                # 解决粘包
                msg_size = len(res)
                response_msg = {'status': 'Ready',
                                'size': msg_size,
                                'content': res}
                response_msg = json.dumps(response_msg)
                outputs.append(r)  # 接收完消息后把客户对象加到回消息列表
                messages[r].put(response_msg)  # 把接受到的消息加到字典里面
            else:
                inputs.remove(r)  # 报错,客户端链接断开,删除侦听的客户端socket对象
    # 所有给我发过消息的人
    for w in wlist:
        try:
            msg = messages[w].get_nowait()  # 去指定队列取数据,并且不阻塞
            msg_dict = json.loads(msg)
            w.sendall(bytes(str(msg_dict['size']), encoding='utf-8'))
            recv_tag = w.recv(1024)
            if str(recv_tag, encoding='utf-8') == 'Start':
                response = bytes(msg_dict['content'], encoding='utf-8')
                w.sendall(response)     # 反馈消息
                outputs.remove(w)  # 从回消息列表中删除客户端socket对象
            else:
                raise Exception('断开连接')
        except Exception as error:  # 发送异常,说明连接中断
            del messages[r]  # 删除接收到的消息

# # rlist = [sk,], rlist=[sk1,],rlist=[sk1,sk2]
# # rlist = []
客户端
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: Liang Lian
# Python 3.5

import socket
import json
# 侦听的IP和端口
ip_port = ('127.0.0.1', 6666)
# 创建socket对象
s = socket.socket()
s.connect(ip_port)

while True:
    # 发送消息
    send_data = input(">>>").strip()
    # 用户如果没有输入,进入到下一步发送给server端,server接收到空就会堵塞不运行
    if len(send_data) == 0:
        continue
    # 用户输入exit退出
    if send_data == 'exit':
        break
    # 发送用户输入的指令到server端
    s.send(bytes(send_data, encoding='utf-8'))

    # 接收消息
    # 接收到对方返回结果的准备消息和要返回内容的长度
    msg_szie = 0
    recv_msg = s.recv(1024)
    if recv_msg:
        msg_size = int(str(recv_msg, encoding='utf-8'))

    # 发送一个'start'给server端,让server端开始发送内容
    start_tag = 'Start'
    s.send(bytes(start_tag, encoding='utf-8'))

    # 通过第一次交互,得知要接受多少数据后,recv_size设置初始值
    recv_size = 0
    recv_msg = b''

    # 当前接收内容小于总接收内容,则一直持续到接收完
    while recv_size < msg_size:
        recv_data = s.recv(1024)

        # 每次接收内容做拼接
        recv_msg += recv_data
        recv_size += len(recv_data)
    print(str(recv_msg, encoding='utf-8'))

s.close()

如上代码实现可接收接收多个客户端请求和客户端请求的读写分离,但实际上也是一个个处理的,只是把客户端的链接请求和发送消息在服务端分开处理了,分别把变化的记录到list中,然后挨个切换处理,如果同时来大量的话还是会有挂起等待的现象,但是相对原生socket那种原子操作(从连接建立到处理完毕断开连接,整个过程都占用了服务器所有资源)要好很多.

异步I/O(asynchronous IO)

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它收到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
用异步IO实现的服务器这里就不举例了,以后有时间另开文章来讲述。异步IO是真正非阻塞的,它不会对请求进程产生任何的阻塞,因此对高并发的网络服务器实现至关重要。

区别

到目前为止,已经将四个IO模型都介绍完了。现在回过头来看下面的问题:

1、blocking和non-blocking的区别在哪?
2、synchronous IO和asynchronous IO的区别在哪?

回答:

  • 1、blocking和non-blocking的区别在哪?

    blockingnon-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还在准备数据的情况下会立刻返回。

I/O Model I/O Model

  • 2、synchronous IO和asynchronous IO的区别在哪?
    在说明synchronous IOasynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:

    A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
    An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做IO 操作的时候会将进程阻塞。按照这个定义,之前所述的blocking IOnon-blocking IO,IO 多路复用都属于synchronous IO。有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个系统调用。

  • non-blocking IO在执行recvfrom这个系统调用的时候,如果kernel的数据没有准备好,这时候不会block进程。但是当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内进程是被block的。

I/O Model

而asynchronous IO则不一样,当进程发起IO操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

I/O多路复用的应用场景

  • (1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。
  • (2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。
  • (3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。
  • (4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。
  • (5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。
  • 与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

参考资料

文章目录
  1. 1. 回顾原生Socket
  2. 2. 网络IO阻塞模型
    1. 2.1. 什么是I/O
    2. 2.2. Blocking I/O Model
    3. 2.3. 非阻塞:non-blocking IO
    4. 2.4. 多路复用IO(IO multiplexing)
      1. 2.4.1. 对于Select
        1. 2.4.1.1. 利用select监听终端操作实例:
          1. 2.4.1.1.1. I/O多路复用应用案例:利用select实现伪’同时’处理多个socket客户端请求
          2. 2.4.1.1.2. 服务端
          3. 2.4.1.1.3. 客户端
    5. 2.5. 异步I/O(asynchronous IO)
  3. 3. 区别
  4. 4. I/O多路复用的应用场景
  5. 5. 参考资料
|